package cn.tiakon.dmp.report

import cn.tiakon.dmp.untils.{ContextUtils, Utils}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}

import scala.collection.mutable.ListBuffer

/**
  * 广告在 app 维度上的指标分布情况
  *
  * @author Tiakon
  *         2018/3/29 11:22
  */
object RepoADInApp {
  def main(args: Array[String]): Unit = {

    val parquetPath: String = Utils.load.getString("output.parquet.path")

    val sc = ContextUtils.getSparkContext()
    val sqlc: SQLContext = new SQLContext(sc)

    //    原始请求数 有效请求 广告请求 参与竞价数 竞价成功数  展示量 点击量  广告成本 广告消费
    //    requestmode	processnode	iseffective	isbilling	isbid	iswin	adorderid
    val dictRDD: RDD[String] = sc.textFile(Utils.load.getString("input.dict.path"))

    //    dictRDD.take(20).toBuffer.foreach(println)
    val filteredRDD: RDD[(String, String)] = dictRDD.filter(_.length > 0)
      .map(_.split("\t", -1))
      .filter(arr => arr.length > 5)
      .map(fields => (fields(4), fields(1)))

    //收集到Driver端
    val tuplesArrayInDriver: Array[(String, String)] = filteredRDD.collect()

    //生成广播变量
    val tuplesBroadcastUrl: Broadcast[Array[(String, String)]] = sc.broadcast(tuplesArrayInDriver)

    //取值
    val tuplesArrayInExecutor: Array[(String, String)] = tuplesBroadcastUrl.value

    val srcDataFrame: DataFrame = sqlc.read.parquet(parquetPath)

    //    srcDataFrame.show()

    val appInfo: RDD[((String, String), ListBuffer[Double])] = srcDataFrame.map(rdd => {

      val appid: String = rdd.getAs[String]("appid")
      val appname: String = rdd.getAs[String]("appname")

      val requestmode: Int = rdd.getAs[Int]("requestmode")
      val processnode: Int = rdd.getAs[Int]("processnode")

      val iseffective: Int = rdd.getAs[Int]("iseffective")
      val isbilling: Int = rdd.getAs[Int]("isbilling")
      val isbid: Int = rdd.getAs[Int]("isbid")
      val iswin: Int = rdd.getAs[Int]("iswin")

      val adorderid: Int = rdd.getAs[Int]("adorderid")

      val winprice: Double = rdd.getAs[Double]("winprice")
      val adpayment: Double = rdd.getAs[Double]("adpayment")

      //      (原始请求数 有效请求 广告请求)
      val validRreq: ListBuffer[Double] = if (requestmode == 1 && processnode >= 1)
        if (processnode >= 2) {
          if (processnode == 3) ListBuffer(1, 1, 1)
          else ListBuffer(1, 1, 0)
        } else ListBuffer(1, 0, 0)
      else ListBuffer(0, 0, 0)


      //      (参与竞价数)
      val bidding: List[Double] = if (iseffective == 1 && isbilling == 1 && isbid == 1 && adorderid != 0) List[Double](1)
      else List[Double](0)

      //      (竞价成功数,广告成本,广告消费)
      val BiddingCostsAndConsum: List[Double] = if (iseffective == 1 && isbilling == 1 && iswin == 1) List(1, 1, 1)
      else List(0, 0, 0)

      //      (展示量 点击量)
      val showAndClick: List[Double] = if (iseffective == 1 && requestmode == 2) {
        if (requestmode == 3) List(1, 1)
        else List(1, 0)
      } else List(0, 0)

      ((appid, appname), validRreq ++ bidding ++ BiddingCostsAndConsum ++ showAndClick)
    })




    //    tuplesArrayInExecutor.toBuffer.foreach(println)
//    appInfo.take(100).toBuffer.foreach(println)


    /*appInfo.map { t =>
      if (("其他".equals(t._2))) {

      }
    }*/

  }
}
